Sean Harkins
04/17/2021, 1:43 AMCloudHandler
via environment settings.
I used the appropriate environment settings in my run_config
with
flow.run_config = ECSRun(
image=worker_image,
labels=["dask_test"],
task_definition=definition,
env={"PREFECT__LOGGING__EXTRA_LOGGERS": "pangeo_forge.recipe"}
)
I have also configured the task’s regular Python logging with logging.getLogger("pangeo_forge.recipe").setLevel(level=logging.DEBUG)
and this logger is successfully writing to my Cloudwatch logs for the Dask worker. But none of this module’s log entries are being written to Prefect Cloud. Any suggestions on how I should configure things so that my Dask workers’ log streams are also written to Cloud?Julio Venegas
04/17/2021, 4:09 PMprefect register --project PROJECT_NAME --path PATH/TO/DIR_WITH_N_FLOWS
and even though all flows in the dir are registered and showing in the project, when I try to run them, the agent does not pick them up. If I register them individually with
cd PATH/TO/DIR_WITH_N_FLOWS
python flow1.py
python flow2.py
and I try to run them, they work i.e. the agent picks them up.
Where do I get the appropriate logs and where do I share what I find?Hawkar Mahmod
04/17/2021, 7:08 PMAurélien Vallée
04/18/2021, 6:00 AMAvi Haiat
04/18/2021, 4:44 PMrange(0,count,LIMIT)
pseudo code:
@task
def count_collection_mongo(collectionName):
# calculate number of rows in the collection and return it
return count
@task
def load_data(collectionName, skip, limit):
# data = db.collectionName.find().skip(skip).limit(limit)
return data
@task
def transform(data):
print(data.get('name'))
return data
@task
def calculate_iterations_data(count_result):
res = range(0, count_result, LIMIT)
return res
with Flow("ETL flow for mycollection") as flow:
result_count = count_collection_mongo("mycollection")
iterations = calculate_iterations_data(result_count)
data = load_data.map(iterations)
transform.map(flatten(data))
flow.run()
Avi Haiat
04/18/2021, 4:45 PMVarun Joshi
04/19/2021, 3:56 AMfrom google.cloud import datacatalog_v1
from google.cloud import bigquery
client = bigquery.Client()
datacatalog_client = datacatalog_v1.DataCatalogClient()
I get an error saying that:
Clients have non-trivial state that is local and unpickleable
Can someone tell me what does this mean? I'm assuming this has something to do with GCP.
I'm using a local service account json file and also importing pubsub modules, I didn't have any issue then.Rob Fowler
04/19/2021, 11:46 AMfrom time import sleep
import prefect
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.storage import Docker
@task
def slow_task(item, sleep_time=4):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"==== IN TASK item {item} sleeping {sleep_time}")
sleep(sleep_time)
<http://logger.info|logger.info>(f"## Awake {item}")
return item
@prefect.task
def produce_range(rangelen):
return range(rangelen)
with Flow("parallel") as flow:
nrange = produce_range(6)
numbers = slow_task.map(item=nrange)
flow.executor = LocalDaskExecutor(workers=10, scheduler="processes")
#flow.executor = DaskExecutor()
if 'local_script_path' in prefect.context:
flow.storage = Docker(registry_url='<http://old7.mianos.com:5000|old7.mianos.com:5000>')
if __name__ == '__main__':
flow.run()
Would start 10 processes inside the docker container running, returning after a few seconds but it executes each task serially unless I run it without any arguments to LocalDaskExecutor.
Under k8s DaskExecutor seems to start a few threads but nowhere near enough to what I want (30+).
I feel I am missing something.Milly gupta
04/19/2021, 1:24 PMJack Sundberg
04/19/2021, 3:37 PMJonathan Chu
04/20/2021, 12:13 AMxyzz
04/20/2021, 8:45 AMxyzz
04/20/2021, 8:57 AMRob Fowler
04/20/2021, 9:01 AMDotan Asselmann
04/20/2021, 1:37 PMPedro Machado
04/20/2021, 2:45 PMCreateContainer
, StartContainer
, etc.).
However, it seems that the logs would need to be retrieved after the container finishes running with GetContainerLogs
. Is this correct? If so, this is less than ideal for our use case given that these are long-running processes we'd want to see the logs in real-time.
So far, I've thought about a couple of alternatives:
1. Modify the Docker Tasks to use the logs
method with stream=True
(I haven't tested this yet, but the docs suggest this could work)
2. Add prefect to their docker image and create a flow that runs inside of the image
Do you see another option? What would you recommend? Thanks!Kelby
04/20/2021, 3:22 PMKayvan Shah
04/20/2021, 6:29 PMMitchell Bregman
04/20/2021, 7:48 PMservice_account_name
, but it seems the pods are still using the default service account to the namespace.. my agent is also deployed with an undefined service_account_name
.. any thoughts?
flow.run_configs = KubernetesRun(
image_pull_secrets=["xxx"],
job_template=config.KUBE_JOB_TEMPLATE,
cpu_limit=config.KUBE_CPU_LIMIT,
cpu_request=config.KUBE_CPU_REQUEST,
memory_limit=config.KUBE_MEMORY_LIMIT,
memory_request=config.KUBE_MEMORY_REQUEST,
service_account_name="my-service-account",
)
Gage Toschlog
04/20/2021, 9:14 PM@task
def run_DBT_task():
task = DbtShellTask(
profile_name='snowflake',
environment='sandbox',
profiles_dir='.',
stream_output=True,
return_all=True
)
task.run(command="cd ~/src/prefect && dbt run")
with Flow(name="Local Test") as flow:
run_DBT_task()
flowID = flow.register(project_name="51", labels=["localtest"])
client.create_flow_run(flow_id = flowID)
Aurélien Vallée
04/21/2021, 10:16 AM@task
def mytask():
logger = prefect.context.get('logger')
<http://logger.info|logger.info>('hello')
and
logger = logging.getLogger(__name__)
@task
def mytask():
<http://logger.info|logger.info>('hello')
i.e. What is the point of using a prefect-provided logger instead of regular python logging loggers?Aurélien Vallée
04/21/2021, 10:17 AMAurélien Vallée
04/21/2021, 10:18 AMRob Fowler
04/21/2021, 10:32 AMciaran
04/21/2021, 11:55 AMJérémy Trudel
04/21/2021, 1:34 PMMarc Lipoff
04/21/2021, 2:49 PMprefect build -p /home/circleci/project/data_flows/flows/vaccine_cdc.py
(log in thread)Kayvan Shah
04/21/2021, 4:35 PMSveta Kochkaeva
04/21/2021, 4:52 PMMark McDonald
04/21/2021, 5:18 PMMark McDonald
04/21/2021, 5:18 PMnicholas
04/21/2021, 5:27 PMMark McDonald
04/21/2021, 5:27 PMnicholas
04/21/2021, 5:28 PMMark McDonald
04/21/2021, 5:29 PMnicholas
04/21/2021, 5:50 PMMark McDonald
04/21/2021, 9:38 PM{
flow_run(where: {flow_id: {_eq: flow_id}}, limit: 1, order_by: {created: asc}) {
id
}
}
2. I'd then check the flow_run_state for the most recent flow run and make sure that status is success, failed, or cancelled
{
flow_run_state(where: {flow_run_id: {_eq: flow_run_id}} limit: 1, order_by: {timestamp: desc}) {
state, timestamp
}
}
Does this make sense to you? Is there a better way of querying the graphql api to see if there are any active runs for flow, regardless of flow version?nicholas
04/21/2021, 9:59 PMquery {
flow_run(
where:
{
flow_id: { _eq: <<flow_id from context>> }, flow_run_id: { _neq: <<flow_run_id from context>> },
state: { _in: ["Running", "Submitted"] } } )
{
id
state
}
}
Mark McDonald
04/21/2021, 10:01 PMnicholas
04/22/2021, 12:52 PM