classBaseJobConfiguration(BaseModel):command:Optional[str]=Field(default=None,description=("The command to use when starting a flow run. ""In most cases, this should be left blank and the command ""will be automatically generated by the worker."),)env:Dict[str,Optional[str]]=Field(default_factory=dict,title="Environment Variables",description="Environment variables to set when starting a flow run.",)labels:Dict[str,str]=Field(default_factory=dict,description=("Labels applied to infrastructure created by the worker using ""this job configuration."),)name:Optional[str]=Field(default=None,description=("Name given to infrastructure created by the worker using this ""job configuration."),)_related_objects:Dict[str,Any]=PrivateAttr(default_factory=dict)@propertydefis_using_a_runner(self):returnself.commandisnotNoneand"prefect flow-run execute"inself.command@validator("command")def_coerce_command(cls,v):returnreturn_v_or_none(v)@staticmethoddef_get_base_config_defaults(variables:dict)->dict:"""Get default values from base config for all variables that have them."""defaults=dict()forvariable_name,attrsinvariables.items():if"default"inattrs:defaults[variable_name]=attrs["default"]returndefaults@classmethod@inject_clientasyncdeffrom_template_and_values(cls,base_job_template:dict,values:dict,client:"PrefectClient"=None):"""Creates a valid worker configuration object from the provided base configuration and overrides. Important: this method expects that the base_job_template was already validated server-side. """job_config:Dict[str,Any]=base_job_template["job_configuration"]variables_schema=base_job_template["variables"]variables=cls._get_base_config_defaults(variables_schema.get("properties",{}))variables.update(values)populated_configuration=apply_values(template=job_config,values=variables)populated_configuration=awaitresolve_block_document_references(template=populated_configuration,client=client)populated_configuration=awaitresolve_variables(template=populated_configuration,client=client)returncls(**populated_configuration)@classmethoddefjson_template(cls)->dict:"""Returns a dict with job configuration as keys and the corresponding templates as values Defaults to using the job configuration parameter name as the template variable name. e.g. { key1: '{{ key1 }}', # default variable template key2: '{{ template2 }}', # `template2` specifically provide as template } """configuration={}properties=cls.schema()["properties"]fork,vinproperties.items():ifv.get("template"):template=v["template"]else:template="{{ "+k+" }}"configuration[k]=templatereturnconfigurationdefprepare_for_flow_run(self,flow_run:"FlowRun",deployment:Optional["DeploymentResponse"]=None,flow:Optional["Flow"]=None,):""" Prepare the job configuration for a flow run. This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run. Args: flow_run: The flow run to be executed. deployment: The deployment that the flow run is associated with. flow: The flow that the flow run is associated with. """self._related_objects={"deployment":deployment,"flow":flow,"flow-run":flow_run,}ifdeploymentisnotNone:deployment_labels=self._base_deployment_labels(deployment)else:deployment_labels={}ifflowisnotNone:flow_labels=self._base_flow_labels(flow)else:flow_labels={}env={**self._base_environment(),**self._base_flow_run_environment(flow_run),**self.env,}self.env={key:valueforkey,valueinenv.items()ifvalueisnotNone}self.labels={**self._base_flow_run_labels(flow_run),**deployment_labels,**flow_labels,**self.labels,}self.name=self.nameorflow_run.nameself.command=self.commandorself._base_flow_run_command()@staticmethoddef_base_flow_run_command()->str:""" Generate a command for a flow run job. """ifexperiment_enabled("enhanced_cancellation"):if(PREFECT_EXPERIMENTAL_WARNandPREFECT_EXPERIMENTAL_WARN_ENHANCED_CANCELLATION):warnings.warn(EXPERIMENTAL_WARNING.format(feature="Enhanced flow run cancellation",group="enhanced_cancellation",help="",),ExperimentalFeature,stacklevel=3,)return"prefect flow-run execute"return"python -m prefect.engine"@staticmethoddef_base_flow_run_labels(flow_run:"FlowRun")->Dict[str,str]:""" Generate a dictionary of labels for a flow run job. """return{"prefect.io/flow-run-id":str(flow_run.id),"prefect.io/flow-run-name":flow_run.name,"prefect.io/version":prefect.__version__,}@classmethoddef_base_environment(cls)->Dict[str,str]:""" Environment variables that should be passed to all created infrastructure. These values should be overridable with the `env` field. """returnget_current_settings().to_environment_variables(exclude_unset=True)@staticmethoddef_base_flow_run_environment(flow_run:"FlowRun")->Dict[str,str]:""" Generate a dictionary of environment variables for a flow run job. """return{"PREFECT__FLOW_RUN_ID":str(flow_run.id)}@staticmethoddef_base_deployment_labels(deployment:"DeploymentResponse")->Dict[str,str]:labels={"prefect.io/deployment-id":str(deployment.id),"prefect.io/deployment-name":deployment.name,}ifdeployment.updatedisnotNone:labels["prefect.io/deployment-updated"]=deployment.updated.in_timezone("utc").to_iso8601_string()returnlabels@staticmethoddef_base_flow_labels(flow:"Flow")->Dict[str,str]:return{"prefect.io/flow-id":str(flow.id),"prefect.io/flow-name":flow.name,}def_related_resources(self)->List[RelatedResource]:tags=set()related=[]forkind,objinself._related_objects.items():ifobjisNone:continueifhasattr(obj,"tags"):tags.update(obj.tags)related.append(object_as_related_resource(kind=kind,role=kind,object=obj))returnrelated+tags_as_related_resources(tags)
@classmethod@inject_clientasyncdeffrom_template_and_values(cls,base_job_template:dict,values:dict,client:"PrefectClient"=None):"""Creates a valid worker configuration object from the provided base configuration and overrides. Important: this method expects that the base_job_template was already validated server-side. """job_config:Dict[str,Any]=base_job_template["job_configuration"]variables_schema=base_job_template["variables"]variables=cls._get_base_config_defaults(variables_schema.get("properties",{}))variables.update(values)populated_configuration=apply_values(template=job_config,values=variables)populated_configuration=awaitresolve_block_document_references(template=populated_configuration,client=client)populated_configuration=awaitresolve_variables(template=populated_configuration,client=client)returncls(**populated_configuration)
@classmethoddefjson_template(cls)->dict:"""Returns a dict with job configuration as keys and the corresponding templates as values Defaults to using the job configuration parameter name as the template variable name. e.g. { key1: '{{ key1 }}', # default variable template key2: '{{ template2 }}', # `template2` specifically provide as template } """configuration={}properties=cls.schema()["properties"]fork,vinproperties.items():ifv.get("template"):template=v["template"]else:template="{{ "+k+" }}"configuration[k]=templatereturnconfiguration
defprepare_for_flow_run(self,flow_run:"FlowRun",deployment:Optional["DeploymentResponse"]=None,flow:Optional["Flow"]=None,):""" Prepare the job configuration for a flow run. This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run. Args: flow_run: The flow run to be executed. deployment: The deployment that the flow run is associated with. flow: The flow that the flow run is associated with. """self._related_objects={"deployment":deployment,"flow":flow,"flow-run":flow_run,}ifdeploymentisnotNone:deployment_labels=self._base_deployment_labels(deployment)else:deployment_labels={}ifflowisnotNone:flow_labels=self._base_flow_labels(flow)else:flow_labels={}env={**self._base_environment(),**self._base_flow_run_environment(flow_run),**self.env,}self.env={key:valueforkey,valueinenv.items()ifvalueisnotNone}self.labels={**self._base_flow_run_labels(flow_run),**deployment_labels,**flow_labels,**self.labels,}self.name=self.nameorflow_run.nameself.command=self.commandorself._base_flow_run_command()
@register_base_typeclassBaseWorker(abc.ABC):type:strjob_configuration:Type[BaseJobConfiguration]=BaseJobConfigurationjob_configuration_variables:Optional[Type[BaseVariables]]=None_documentation_url=""_logo_url=""_description=""def__init__(self,work_pool_name:str,work_queues:Optional[List[str]]=None,name:Optional[str]=None,prefetch_seconds:Optional[float]=None,create_pool_if_not_found:bool=True,limit:Optional[int]=None,heartbeat_interval_seconds:Optional[int]=None,*,base_job_template:Optional[Dict[str,Any]]=None,):""" Base class for all Prefect workers. Args: name: The name of the worker. If not provided, a random one will be generated. If provided, it cannot contain '/' or '%'. The name is used to identify the worker in the UI; if two processes have the same name, they will be treated as the same worker. work_pool_name: The name of the work pool to poll. work_queues: A list of work queues to poll. If not provided, all work queue in the work pool will be polled. prefetch_seconds: The number of seconds to prefetch flow runs for. create_pool_if_not_found: Whether to create the work pool if it is not found. Defaults to `True`, but can be set to `False` to ensure that work pools are not created accidentally. limit: The maximum number of flow runs this worker should be running at a given time. base_job_template: If creating the work pool, provide the base job template to use. Logs a warning if the pool already exists. """ifnameand("/"innameor"%"inname):raiseValueError("Worker name cannot contain '/' or '%'")self.name=nameorf"{self.__class__.__name__}{uuid4()}"self._logger=get_logger(f"worker.{self.__class__.type}.{self.name.lower()}")self.is_setup=Falseself._create_pool_if_not_found=create_pool_if_not_foundself._base_job_template=base_job_templateself._work_pool_name=work_pool_nameself._work_queues:Set[str]=set(work_queues)ifwork_queueselseset()self._prefetch_seconds:float=(prefetch_secondsorPREFECT_WORKER_PREFETCH_SECONDS.value())self.heartbeat_interval_seconds=(heartbeat_interval_secondsorPREFECT_WORKER_HEARTBEAT_SECONDS.value())self._work_pool:Optional[WorkPool]=Noneself._runs_task_group:Optional[anyio.abc.TaskGroup]=Noneself._client:Optional[PrefectClient]=Noneself._last_polled_time:pendulum.DateTime=pendulum.now("utc")self._limit=limitself._limiter:Optional[anyio.CapacityLimiter]=Noneself._submitting_flow_run_ids=set()self._cancelling_flow_run_ids=set()self._scheduled_task_scopes=set()@classmethoddefget_documentation_url(cls)->str:returncls._documentation_url@classmethoddefget_logo_url(cls)->str:returncls._logo_url@classmethoddefget_description(cls)->str:returncls._description@classmethoddefget_default_base_job_template(cls)->Dict:ifcls.job_configuration_variablesisNone:schema=cls.job_configuration.schema()# remove "template" key from all dicts in schema['properties'] because it is not a# relevant fieldforkey,valueinschema["properties"].items():ifisinstance(value,dict):schema["properties"][key].pop("template",None)variables_schema=schemaelse:variables_schema=cls.job_configuration_variables.schema()variables_schema.pop("title",None)return{"job_configuration":cls.job_configuration.json_template(),"variables":variables_schema,}@staticmethoddefget_worker_class_from_type(type:str)->Optional[Type["BaseWorker"]]:""" Returns the worker class for a given worker type. If the worker type is not recognized, returns None. """load_prefect_collections()worker_registry=get_registry_for_type(BaseWorker)ifworker_registryisnotNone:returnworker_registry.get(type)@staticmethoddefget_all_available_worker_types()->List[str]:""" Returns all worker types available in the local registry. """load_prefect_collections()worker_registry=get_registry_for_type(BaseWorker)ifworker_registryisnotNone:returnlist(worker_registry.keys())return[]defget_name_slug(self):returnslugify(self.name)defget_flow_run_logger(self,flow_run:"FlowRun")->PrefectLogAdapter:returnflow_run_logger(flow_run=flow_run).getChild("worker",extra={"worker_name":self.name,"work_pool_name":(self._work_pool_nameifself._work_poolelse"<unknown>"),"work_pool_id":str(getattr(self._work_pool,"id","unknown")),},)@abc.abstractmethodasyncdefrun(self,flow_run:"FlowRun",configuration:BaseJobConfiguration,task_status:Optional[anyio.abc.TaskStatus]=None,)->BaseWorkerResult:""" Runs a given flow run on the current worker. """raiseNotImplementedError("Workers must implement a method for running submitted flow runs")asyncdefkill_infrastructure(self,infrastructure_pid:str,configuration:BaseJobConfiguration,grace_seconds:int=30,):""" Method for killing infrastructure created by a worker. Should be implemented by individual workers if they support killing infrastructure. """raiseNotImplementedError("This worker does not support killing infrastructure.")@classmethoddef__dispatch_key__(cls):ifcls.__name__=="BaseWorker":returnNone# The base class is abstractreturncls.typeasyncdefsetup(self):"""Prepares the worker to run."""self._logger.debug("Setting up worker...")self._runs_task_group=anyio.create_task_group()self._limiter=(anyio.CapacityLimiter(self._limit)ifself._limitisnotNoneelseNone)self._client=get_client()awaitself._client.__aenter__()awaitself._runs_task_group.__aenter__()self.is_setup=Trueasyncdefteardown(self,*exc_info):"""Cleans up resources after the worker is stopped."""self._logger.debug("Tearing down worker...")self.is_setup=Falseforscopeinself._scheduled_task_scopes:scope.cancel()ifself._runs_task_group:awaitself._runs_task_group.__aexit__(*exc_info)ifself._client:awaitself._client.__aexit__(*exc_info)self._runs_task_group=Noneself._client=Nonedefis_worker_still_polling(self,query_interval_seconds:int)->bool:""" This method is invoked by a webserver healthcheck handler and returns a boolean indicating if the worker has recorded a scheduled flow run poll within a variable amount of time. The `query_interval_seconds` is the same value that is used by the loop services - we will evaluate if the _last_polled_time was within that interval x 30 (so 10s -> 5m) The instance property `self._last_polled_time` is currently set/updated in `get_and_submit_flow_runs()` """threshold_seconds=query_interval_seconds*30seconds_since_last_poll=(pendulum.now("utc")-self._last_polled_time).in_seconds()is_still_polling=seconds_since_last_poll<=threshold_secondsifnotis_still_polling:self._logger.error(f"Worker has not polled in the last {seconds_since_last_poll} seconds ""and should be restarted")returnis_still_pollingasyncdefget_and_submit_flow_runs(self):runs_response=awaitself._get_scheduled_flow_runs()self._last_polled_time=pendulum.now("utc")returnawaitself._submit_scheduled_flow_runs(flow_run_response=runs_response)asyncdefcheck_for_cancelled_flow_runs(self):ifnotself.is_setup:raiseRuntimeError("Worker is not set up. Please make sure you are running this worker ""as an async context manager.")self._logger.debug("Checking for cancelled flow runs...")work_queue_filter=(WorkQueueFilter(name=WorkQueueFilterName(any_=list(self._work_queues)))ifself._work_queueselseNone)named_cancelling_flow_runs=awaitself._client.read_flow_runs(flow_run_filter=FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=[StateType.CANCELLED]),name=FlowRunFilterStateName(any_=["Cancelling"]),),# Avoid duplicate cancellation callsid=FlowRunFilterId(not_any_=list(self._cancelling_flow_run_ids)),),work_pool_filter=WorkPoolFilter(name=WorkPoolFilterName(any_=[self._work_pool_name])),work_queue_filter=work_queue_filter,)typed_cancelling_flow_runs=awaitself._client.read_flow_runs(flow_run_filter=FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=[StateType.CANCELLING]),),# Avoid duplicate cancellation callsid=FlowRunFilterId(not_any_=list(self._cancelling_flow_run_ids)),),work_pool_filter=WorkPoolFilter(name=WorkPoolFilterName(any_=[self._work_pool_name])),work_queue_filter=work_queue_filter,)cancelling_flow_runs=named_cancelling_flow_runs+typed_cancelling_flow_runsifcancelling_flow_runs:self._logger.info(f"Found {len(cancelling_flow_runs)} flow runs awaiting cancellation.")forflow_runincancelling_flow_runs:self._cancelling_flow_run_ids.add(flow_run.id)self._runs_task_group.start_soon(self.cancel_run,flow_run)returncancelling_flow_runsasyncdefcancel_run(self,flow_run:"FlowRun"):run_logger=self.get_flow_run_logger(flow_run)try:configuration=awaitself._get_configuration(flow_run)exceptObjectNotFound:self._logger.warning(f"Flow run {flow_run.id!r} cannot be cancelled by this worker:"f" associated deployment {flow_run.deployment_id!r} does not exist.")awaitself._mark_flow_run_as_cancelled(flow_run,state_updates={"message":("This flow run is missing infrastructure configuration information"" and cancellation cannot be guaranteed.")},)returnelse:ifconfiguration.is_using_a_runner:self._logger.info(f"Skipping cancellation because flow run {str(flow_run.id)!r} is"" using enhanced cancellation. A dedicated runner will handle"" cancellation.")returnifnotflow_run.infrastructure_pid:run_logger.error(f"Flow run '{flow_run.id}' does not have an infrastructure pid"" attached. Cancellation cannot be guaranteed.")awaitself._mark_flow_run_as_cancelled(flow_run,state_updates={"message":("This flow run is missing infrastructure tracking information"" and cancellation cannot be guaranteed.")},)returntry:awaitself.kill_infrastructure(infrastructure_pid=flow_run.infrastructure_pid,configuration=configuration,)exceptNotImplementedError:self._logger.error(f"Worker type {self.type!r} does not support killing created ""infrastructure. Cancellation cannot be guaranteed.")exceptInfrastructureNotFoundasexc:self._logger.warning(f"{exc} Marking flow run as cancelled.")awaitself._mark_flow_run_as_cancelled(flow_run)exceptInfrastructureNotAvailableasexc:self._logger.warning(f"{exc} Flow run cannot be cancelled by this worker.")exceptException:run_logger.exception("Encountered exception while killing infrastructure for flow run "f"'{flow_run.id}'. Flow run may not be cancelled.")# We will try again on generic exceptionsself._cancelling_flow_run_ids.remove(flow_run.id)returnelse:self._emit_flow_run_cancelled_event(flow_run=flow_run,configuration=configuration)awaitself._mark_flow_run_as_cancelled(flow_run)run_logger.info(f"Cancelled flow run '{flow_run.id}'!")asyncdef_update_local_work_pool_info(self):try:work_pool=awaitself._client.read_work_pool(work_pool_name=self._work_pool_name)exceptObjectNotFound:ifself._create_pool_if_not_found:wp=WorkPoolCreate(name=self._work_pool_name,type=self.type,)ifself._base_job_templateisnotNone:wp.base_job_template=self._base_job_templatework_pool=awaitself._client.create_work_pool(work_pool=wp)self._logger.info(f"Work pool {self._work_pool_name!r} created.")else:self._logger.warning(f"Work pool {self._work_pool_name!r} not found!")ifself._base_job_templateisnotNone:self._logger.warning("Ignoring supplied base job template because the work pool"" already exists")return# if the remote config type changes (or if it's being loaded for the# first time), check if it matches the local type and warn if notifgetattr(self._work_pool,"type",0)!=work_pool.type:ifwork_pool.type!=self.__class__.type:self._logger.warning("Worker type mismatch! This worker process expects type "f"{self.type!r} but received {work_pool.type!r}"" from the server. Unexpected behavior may occur.")# once the work pool is loaded, verify that it has a `base_job_template` and# set it if notifnotwork_pool.base_job_template:job_template=self.__class__.get_default_base_job_template()awaitself._set_work_pool_template(work_pool,job_template)work_pool.base_job_template=job_templateself._work_pool=work_poolasyncdef_send_worker_heartbeat(self):ifself._work_pool:awaitself._client.send_worker_heartbeat(work_pool_name=self._work_pool_name,worker_name=self.name,heartbeat_interval_seconds=self.heartbeat_interval_seconds,)asyncdefsync_with_backend(self):""" Updates the worker's local information about it's current work pool and queues. Sends a worker heartbeat to the API. """awaitself._update_local_work_pool_info()awaitself._send_worker_heartbeat()self._logger.debug("Worker synchronized with the Prefect API server.")asyncdef_get_scheduled_flow_runs(self,)->List["WorkerFlowRunResponse"]:""" Retrieve scheduled flow runs from the work pool's queues. """scheduled_before=pendulum.now("utc").add(seconds=int(self._prefetch_seconds))self._logger.debug(f"Querying for flow runs scheduled before {scheduled_before}")try:scheduled_flow_runs=(awaitself._client.get_scheduled_flow_runs_for_work_pool(work_pool_name=self._work_pool_name,scheduled_before=scheduled_before,work_queue_names=list(self._work_queues),))self._logger.debug(f"Discovered {len(scheduled_flow_runs)} scheduled_flow_runs")returnscheduled_flow_runsexceptObjectNotFound:# the pool doesn't exist; it will be created on the next# heartbeat (or an appropriate warning will be logged)return[]asyncdef_submit_scheduled_flow_runs(self,flow_run_response:List["WorkerFlowRunResponse"])->List["FlowRun"]:""" Takes a list of WorkerFlowRunResponses and submits the referenced flow runs for execution by the worker. """submittable_flow_runs=[entry.flow_runforentryinflow_run_response]submittable_flow_runs.sort(key=lambdarun:run.next_scheduled_start_time)forflow_runinsubmittable_flow_runs:ifflow_run.idinself._submitting_flow_run_ids:continuetry:ifself._limiter:self._limiter.acquire_on_behalf_of_nowait(flow_run.id)exceptanyio.WouldBlock:self._logger.info(f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs"" in progress.")breakelse:run_logger=self.get_flow_run_logger(flow_run)run_logger.info(f"Worker '{self.name}' submitting flow run '{flow_run.id}'")self._submitting_flow_run_ids.add(flow_run.id)self._runs_task_group.start_soon(self._submit_run,flow_run,)returnlist(filter(lambdarun:run.idinself._submitting_flow_run_ids,submittable_flow_runs,))asyncdef_check_flow_run(self,flow_run:"FlowRun")->None:""" Performs a check on a submitted flow run to warn the user if the flow run was created from a deployment with a storage block. """ifflow_run.deployment_id:assert(self._clientandself._client._started),"Client must be started to check flow run deployment."deployment=awaitself._client.read_deployment(flow_run.deployment_id)ifdeployment.storage_document_id:raiseValueError(f"Flow run {flow_run.id!r} was created from deployment"f" {deployment.name!r} which is configured with a storage block."" Please use an agent to execute this flow run.")#asyncdef_submit_run(self,flow_run:"FlowRun")->None:""" Submits a given flow run for execution by the worker. """run_logger=self.get_flow_run_logger(flow_run)try:awaitself._check_flow_run(flow_run)except(ValueError,ObjectNotFound):self._logger.exception(("Flow run %s did not pass checks and will not be submitted for"" execution"),flow_run.id,)self._submitting_flow_run_ids.remove(flow_run.id)returnready_to_submit=awaitself._propose_pending_state(flow_run)ifready_to_submit:readiness_result=awaitself._runs_task_group.start(self._submit_run_and_capture_errors,flow_run)ifreadiness_resultandnotisinstance(readiness_result,Exception):try:awaitself._client.update_flow_run(flow_run_id=flow_run.id,infrastructure_pid=str(readiness_result),)exceptException:run_logger.exception("An error occurred while setting the `infrastructure_pid` on "f"flow run {flow_run.id!r}. The flow run will ""not be cancellable.")run_logger.info(f"Completed submission of flow run '{flow_run.id}'")else:# If the run is not ready to submit, release the concurrency slotifself._limiter:self._limiter.release_on_behalf_of(flow_run.id)self._submitting_flow_run_ids.remove(flow_run.id)asyncdef_submit_run_and_capture_errors(self,flow_run:"FlowRun",task_status:Optional[anyio.abc.TaskStatus]=None)->Union[BaseWorkerResult,Exception]:run_logger=self.get_flow_run_logger(flow_run)try:configuration=awaitself._get_configuration(flow_run)submitted_event=self._emit_flow_run_submitted_event(configuration)result=awaitself.run(flow_run=flow_run,task_status=task_status,configuration=configuration,)exceptExceptionasexc:ifnottask_status._future.done():# This flow run was being submitted and did not start successfullyrun_logger.exception(f"Failed to submit flow run '{flow_run.id}' to infrastructure.")# Mark the task as started to prevent agent crashtask_status.started(exc)awaitself._propose_crashed_state(flow_run,"Flow run could not be submitted to infrastructure")else:run_logger.exception(f"An error occurred while monitoring flow run '{flow_run.id}'. ""The flow run will not be marked as failed, but an issue may have ""occurred.")returnexcfinally:ifself._limiter:self._limiter.release_on_behalf_of(flow_run.id)ifnottask_status._future.done():run_logger.error(f"Infrastructure returned without reporting flow run '{flow_run.id}' ""as started or raising an error. This behavior is not expected and ""generally indicates improper implementation of infrastructure. The ""flow run will not be marked as failed, but an issue may have occurred.")# Mark the task as started to prevent agent crashtask_status.started()ifresult.status_code!=0:awaitself._propose_crashed_state(flow_run,("Flow run infrastructure exited with non-zero status code"f" {result.status_code}."),)self._emit_flow_run_executed_event(result,configuration,submitted_event)returnresultdefget_status(self):""" Retrieves the status of the current worker including its name, current worker pool, the work pool queues it is polling, and its local settings. """return{"name":self.name,"work_pool":(self._work_pool.dict(json_compatible=True)ifself._work_poolisnotNoneelseNone),"settings":{"prefetch_seconds":self._prefetch_seconds,},}asyncdef_get_configuration(self,flow_run:"FlowRun",)->BaseJobConfiguration:deployment=awaitself._client.read_deployment(flow_run.deployment_id)flow=awaitself._client.read_flow(flow_run.flow_id)deployment_vars=deployment.job_variablesor{}flow_run_vars=flow_run.job_variablesor{}job_variables={**deployment_vars,**flow_run_vars}configuration=awaitself.job_configuration.from_template_and_values(base_job_template=self._work_pool.base_job_template,values=job_variables,client=self._client,)configuration.prepare_for_flow_run(flow_run=flow_run,deployment=deployment,flow=flow)returnconfigurationasyncdef_propose_pending_state(self,flow_run:"FlowRun")->bool:run_logger=self.get_flow_run_logger(flow_run)state=flow_run.statetry:state=awaitpropose_state(self._client,Pending(),flow_run_id=flow_run.id)exceptAbortasexc:run_logger.info((f"Aborted submission of flow run '{flow_run.id}'. "f"Server sent an abort signal: {exc}"),)returnFalseexceptException:run_logger.exception(f"Failed to update state of flow run '{flow_run.id}'",)returnFalseifnotstate.is_pending():run_logger.info((f"Aborted submission of flow run '{flow_run.id}': "f"Server returned a non-pending state {state.type.value!r}"),)returnFalsereturnTrueasyncdef_propose_failed_state(self,flow_run:"FlowRun",exc:Exception)->None:run_logger=self.get_flow_run_logger(flow_run)try:awaitpropose_state(self._client,awaitexception_to_failed_state(message="Submission failed.",exc=exc),flow_run_id=flow_run.id,)exceptAbort:# We've already failed, no need to note the abort but we don't want it to# raise in the agent processpassexceptException:run_logger.error(f"Failed to update state of flow run '{flow_run.id}'",exc_info=True,)asyncdef_propose_crashed_state(self,flow_run:"FlowRun",message:str)->None:run_logger=self.get_flow_run_logger(flow_run)try:state=awaitpropose_state(self._client,Crashed(message=message),flow_run_id=flow_run.id,)exceptAbort:# Flow run already marked as failedpassexceptException:run_logger.exception(f"Failed to update state of flow run '{flow_run.id}'")else:ifstate.is_crashed():run_logger.info(f"Reported flow run '{flow_run.id}' as crashed: {message}")asyncdef_mark_flow_run_as_cancelled(self,flow_run:"FlowRun",state_updates:Optional[dict]=None)->None:state_updates=state_updatesor{}state_updates.setdefault("name","Cancelled")state_updates.setdefault("type",StateType.CANCELLED)state=flow_run.state.copy(update=state_updates)awaitself._client.set_flow_run_state(flow_run.id,state,force=True)# Do not remove the flow run from the cancelling set immediately because# the API caches responses for the `read_flow_runs` and we do not want to# duplicate cancellations.awaitself._schedule_task(60*10,self._cancelling_flow_run_ids.remove,flow_run.id)asyncdef_set_work_pool_template(self,work_pool,job_template):"""Updates the `base_job_template` for the worker's work pool server side."""awaitself._client.update_work_pool(work_pool_name=work_pool.name,work_pool=WorkPoolUpdate(base_job_template=job_template,),)asyncdef_schedule_task(self,__in_seconds:int,fn,*args,**kwargs):""" Schedule a background task to start after some time. These tasks will be run immediately when the worker exits instead of waiting. The function may be async or sync. Async functions will be awaited. """asyncdefwrapper(task_status):# If we are shutting down, do not sleep; otherwise sleep until the scheduled# time or shutdownifself.is_setup:withanyio.CancelScope()asscope:self._scheduled_task_scopes.add(scope)task_status.started()awaitanyio.sleep(__in_seconds)self._scheduled_task_scopes.remove(scope)else:task_status.started()result=fn(*args,**kwargs)ifinspect.iscoroutine(result):awaitresultawaitself._runs_task_group.start(wrapper)asyncdef__aenter__(self):self._logger.debug("Entering worker context...")awaitself.setup()returnselfasyncdef__aexit__(self,*exc_info):self._logger.debug("Exiting worker context...")awaitself.teardown(*exc_info)def__repr__(self):returnf"Worker(pool={self._work_pool_name!r}, name={self.name!r})"def_event_resource(self):return{"prefect.resource.id":f"prefect.worker.{self.type}.{self.get_name_slug()}","prefect.resource.name":self.name,"prefect.version":prefect.__version__,"prefect.worker-type":self.type,}def_event_related_resources(self,configuration:Optional[BaseJobConfiguration]=None,include_self:bool=False,)->List[RelatedResource]:related=[]ifconfiguration:related+=configuration._related_resources()ifself._work_pool:related.append(object_as_related_resource(kind="work-pool",role="work-pool",object=self._work_pool))ifinclude_self:worker_resource=self._event_resource()worker_resource["prefect.resource.role"]="worker"related.append(RelatedResource.parse_obj(worker_resource))returnrelateddef_emit_flow_run_submitted_event(self,configuration:BaseJobConfiguration)->Event:returnemit_event(event="prefect.worker.submitted-flow-run",resource=self._event_resource(),related=self._event_related_resources(configuration=configuration),)def_emit_flow_run_executed_event(self,result:BaseWorkerResult,configuration:BaseJobConfiguration,submitted_event:Event,):related=self._event_related_resources(configuration=configuration)forresourceinrelated:ifresource.role=="flow-run":resource["prefect.infrastructure.identifier"]=str(result.identifier)resource["prefect.infrastructure.status-code"]=str(result.status_code)emit_event(event="prefect.worker.executed-flow-run",resource=self._event_resource(),related=related,follows=submitted_event,)asyncdef_emit_worker_started_event(self)->Event:returnemit_event("prefect.worker.started",resource=self._event_resource(),related=self._event_related_resources(),)asyncdef_emit_worker_stopped_event(self,started_event:Event):emit_event("prefect.worker.stopped",resource=self._event_resource(),related=self._event_related_resources(),follows=started_event,)def_emit_flow_run_cancelled_event(self,flow_run:"FlowRun",configuration:BaseJobConfiguration):related=self._event_related_resources(configuration=configuration)forresourceinrelated:ifresource.role=="flow-run":resource["prefect.infrastructure.identifier"]=str(flow_run.infrastructure_pid)emit_event(event="prefect.worker.cancelled-flow-run",resource=self._event_resource(),related=related,)
Returns all worker types available in the local registry.
Source code in src/prefect/workers/base.py
449450451452453454455456457458
@staticmethoddefget_all_available_worker_types()->List[str]:""" Returns all worker types available in the local registry. """load_prefect_collections()worker_registry=get_registry_for_type(BaseWorker)ifworker_registryisnotNone:returnlist(worker_registry.keys())return[]
Retrieves the status of the current worker including its name, current worker
pool, the work pool queues it is polling, and its local settings.
Source code in src/prefect/workers/base.py
958959960961962963964965966967968969970971972973
defget_status(self):""" Retrieves the status of the current worker including its name, current worker pool, the work pool queues it is polling, and its local settings. """return{"name":self.name,"work_pool":(self._work_pool.dict(json_compatible=True)ifself._work_poolisnotNoneelseNone),"settings":{"prefetch_seconds":self._prefetch_seconds,},}
Returns the worker class for a given worker type. If the worker type
is not recognized, returns None.
Source code in src/prefect/workers/base.py
438439440441442443444445446447
@staticmethoddefget_worker_class_from_type(type:str)->Optional[Type["BaseWorker"]]:""" Returns the worker class for a given worker type. If the worker type is not recognized, returns None. """load_prefect_collections()worker_registry=get_registry_for_type(BaseWorker)ifworker_registryisnotNone:returnworker_registry.get(type)
This method is invoked by a webserver healthcheck handler
and returns a boolean indicating if the worker has recorded a
scheduled flow run poll within a variable amount of time.
The query_interval_seconds is the same value that is used by
the loop services - we will evaluate if the _last_polled_time
was within that interval x 30 (so 10s -> 5m)
The instance property self._last_polled_time
is currently set/updated in get_and_submit_flow_runs()
defis_worker_still_polling(self,query_interval_seconds:int)->bool:""" This method is invoked by a webserver healthcheck handler and returns a boolean indicating if the worker has recorded a scheduled flow run poll within a variable amount of time. The `query_interval_seconds` is the same value that is used by the loop services - we will evaluate if the _last_polled_time was within that interval x 30 (so 10s -> 5m) The instance property `self._last_polled_time` is currently set/updated in `get_and_submit_flow_runs()` """threshold_seconds=query_interval_seconds*30seconds_since_last_poll=(pendulum.now("utc")-self._last_polled_time).in_seconds()is_still_polling=seconds_since_last_poll<=threshold_secondsifnotis_still_polling:self._logger.error(f"Worker has not polled in the last {seconds_since_last_poll} seconds ""and should be restarted")returnis_still_polling
Method for killing infrastructure created by a worker. Should be implemented by
individual workers if they support killing infrastructure.
Source code in src/prefect/workers/base.py
489490491492493494495496497498499500501
asyncdefkill_infrastructure(self,infrastructure_pid:str,configuration:BaseJobConfiguration,grace_seconds:int=30,):""" Method for killing infrastructure created by a worker. Should be implemented by individual workers if they support killing infrastructure. """raiseNotImplementedError("This worker does not support killing infrastructure.")
@abc.abstractmethodasyncdefrun(self,flow_run:"FlowRun",configuration:BaseJobConfiguration,task_status:Optional[anyio.abc.TaskStatus]=None,)->BaseWorkerResult:""" Runs a given flow run on the current worker. """raiseNotImplementedError("Workers must implement a method for running submitted flow runs")
asyncdefsetup(self):"""Prepares the worker to run."""self._logger.debug("Setting up worker...")self._runs_task_group=anyio.create_task_group()self._limiter=(anyio.CapacityLimiter(self._limit)ifself._limitisnotNoneelseNone)self._client=get_client()awaitself._client.__aenter__()awaitself._runs_task_group.__aenter__()self.is_setup=True
Updates the worker's local information about it's current work pool and
queues. Sends a worker heartbeat to the API.
Source code in src/prefect/workers/base.py
755756757758759760761762763764
asyncdefsync_with_backend(self):""" Updates the worker's local information about it's current work pool and queues. Sends a worker heartbeat to the API. """awaitself._update_local_work_pool_info()awaitself._send_worker_heartbeat()self._logger.debug("Worker synchronized with the Prefect API server.")
asyncdefteardown(self,*exc_info):"""Cleans up resources after the worker is stopped."""self._logger.debug("Tearing down worker...")self.is_setup=Falseforscopeinself._scheduled_task_scopes:scope.cancel()ifself._runs_task_group:awaitself._runs_task_group.__aexit__(*exc_info)ifself._client:awaitself._client.__aexit__(*exc_info)self._runs_task_group=Noneself._client=None